Skip to content
This repository has been archived by the owner on Sep 13, 2018. It is now read-only.

Avoid quadratic complexity in poll() #155

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

jannic
Copy link
Contributor

@jannic jannic commented Mar 19, 2017

streaming::pipeline::server::Dispatch::poll() called poll on every
single entry of self.in_flight, and then removed at most one
element from that structure (via pop_front()).

So, the complexity is O(n^2) in the size of self.in_flight.

Calling poll() on the first entry of self.in_flight should be
sufficient, the other entries can be handled later. This reduces the
complexity to O(n).

This behavior can be significant if self.in_flight gets large, which can
happen if input is read very quickly. I triggered it with a simple
echo server fed from a unix domain socket:

$ time yes | head -n 1000 | nc -U -N /tmp/sock >/dev/null
real 0m0.051s

$ time yes | head -n 10000 | nc -U -N /tmp/sock >/dev/null
real 0m3.534s

$ time yes | head -n 20000 | nc -U -N /tmp/sock >/dev/null
real 0m13.883s

With this patch, the runtime becomes linear and much faster:

$ time yes | head -n 1000 | nc -U -N /tmp/sock >/dev/null
real 0m0.010s

$ time yes | head -n 10000 | nc -U -N /tmp/sock >/dev/null
real 0m0.049s

$ time yes | head -n 20000 | nc -U -N /tmp/sock >/dev/null
real 0m0.084s

$ time yes | head -n 100000 | nc -U -N /tmp/sock >/dev/null
real 0m0.405s

$ time yes | head -n 1000000 | nc -U -N /tmp/sock >/dev/null
real 0m3.738s

@carllerche
Copy link
Member

The issue is that poll is needed to advance the state of each in-flight. Otherwise, they later ones can't make any progress until the prior ones complete.

That being said, the current implementation is not ideal. the best way to handle this is to use with_unpark_event(https://docs.rs/futures/0.1.11/futures/task/fn.with_unpark_event.html). This allows only polling entries that we know are ready to be polled.

The reason I didn't just do it is that it requires extra allocations + book keeping. So, you only want to switch to the more complex handling when there are enough in-flight requests to make it worth it.

@jannic
Copy link
Contributor Author

jannic commented Mar 20, 2017

Yes, I understand why it may be desirable to trigger in-flight objects as early as possible, in case they can do some processing in the background.
But (IMHO) in most cases, if the pipeline is bandwidth limited at it's output, it doesn't make a huge difference when to do the processing on the input side. And as long as the output is not blocking, read_out_frames() will call poll() again, in a rather short loop. Or am I missing something?
I admit I didn't do any benchmarks besides the one given above, so with this patch, some workloads may become slower than before.
Perhaps some middle-ground would be optimal? Something like:

        for slot in self.in_flight.iter_mut().take(10) {
            slot.poll();

This also fixes the issue I encountered, while still triggering more than just the first in-flight entry. But the number 10 is just an arbitrary constant - not very elegant, either.

@carllerche
Copy link
Member

Here is the situation:

Request 1 comes in, and sleeps for 30 seconds.
Request 2 comes in, and the handler issues a DB request. With the proposed change, the DB request would not be started until request 1 is fully processed.

Anyway, the point is that the correct fix, as mentioned, is to only poll futures for which we know 100% need to be polled. To do this, we need to use with_unpark_event. Otherwise, I would opt to keep the current behavior.

Also, I'd say that there should be a default max in-flight setting of 100.

@jannic
Copy link
Contributor Author

jannic commented Mar 20, 2017

Some arbitrary limit like 10 or 100 would mitigate the issue, for sure. With an echo server on a unix domain socket, I saw more than 1.000.000 entries in the in-flight queue, given enough input. Perhaps that's even the core issue. And if the queue length is limited to some sane value, it doesn't matter to much if it's iterated O(n) or O(n^2) times.

@carllerche
Copy link
Member

Again, the correct way to solve the original issue would be to use with_unpark_event. Only polling the first future in the in-flight list would result in incorrect behavior.

@jannic
Copy link
Contributor Author

jannic commented Mar 20, 2017

Incorrect in the sense that it may be (possibly much) slower, right? Or does it break some contracts / invariants which I don't know about? (Just trying to understand how the crate works, now...)

streaming::pipeline::server::Dispatch::poll() called poll on every
single entry of self.in_flight, and then removed at most one
element from that structure (via pop_front()).

So, the complexity was O(n^2) in the size of self.in_flight.

Instead, call poll only once for a new in_flight entry, and then only
when an unpark event was received.
@jannic
Copy link
Contributor Author

jannic commented Mar 20, 2017

What do you think about this patch?
(This definitely needs a thorough review - I'm rather new to rust, and that's a more complex change than I'm comfortable with.)

@jannic
Copy link
Contributor Author

jannic commented Mar 21, 2017

About the test failure on nightly, shown here: https://travis-ci.org/tokio-rs/tokio-proto/jobs/213217871
I can reproduce the same failure with the current master version of tokio-proto, 0a7d229. Doesn't happen every time, but running cargo test in a loop resulted in the same message after 20 tries.
Happens with both nightly and stable versions of rust.

---- test_reaching_max_in_flight_requests stdout ----
	thread 'test_reaching_max_in_flight_requests' panicked at 'assertion failed: `(left == right)` (left: `32`, right: `33`)', tests/test_multiplex_server.rs:261
note: Run with `RUST_BACKTRACE=1` for a backtrace.

Seems to be a known bug: #114

@carllerche
Copy link
Member

Sorry for the delay.

Re: "Incorrect", I would say it is a bit of both. The pipeline behavior is to enable processing futures concurrently. Given that futures are supposed to be lazy (not do any work until they are polled), not polling means that this behavior isn't followed.

Re: the new PR, I did a quick skim, but I'm going to need to read it more closely before I provide feedback :)

@carllerche
Copy link
Member

At a glance, the implementation seems plausible. I'm not really following why event_offset is needed.

It would also be nice to have a minimum number of in_flight requests before switching to with_unpark_event, this would allow avoiding the allocation in most cases.

Also, it seems to me that this functionality could be extracted into something that can be reused... I'm not sure if that is worth it yet.

for slot in self.in_flight.iter_mut() {
slot.poll();
// new Futures must be polled once to start processing
if self.polled_to < self.in_flight.len() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial poll could probably be handled before inserting it into the in_flight structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, could be easily moved to fn dispatch, a few lines up in the same file.
I just wasn't sure if that would break any non-obvious assumption, so I preferred to keep the poll at the same place as before, for now.

@jannic
Copy link
Contributor Author

jannic commented Mar 31, 2017

If we first start calling poll without with_unpark_event, can we switch to with_unpark_event by just calling poll on the same future, again? Or would we have to keep track whether a future has an unpark event or not, forever?
Even if it's not possible to switch over later, the strategy could be as easy as:

  • always poll the first X in_flight events on each call to Dispatch::poll
  • only use with_unpark_event when more than X events are in flight.
    Worst case would be that an unpark event would be set on a future, then the size of the queue decreases, and then the future would be polled unnecessarily because it became one of the first X events in the queue.
    Not sure how to choose X, as the optimal value of X heavily depends on the implementation of poll() of the given future. If poll() on a non-ready future is expensive, X should be small. If it's cheap, but allocations are expensive, a very large X could be optimal.
    However, even with a (reasonably) large X, quadratic complexity would be avoided, so we could choose some large value to fix this issue while keeping the short-queue case essentially as it was before.

(Updated comment: The proposed strategy should be possible, as we never switch from 'poll unconditionally' to 'use with_unpark_event' for any one future. The opposite switch should be always allowed, AFAIKT)

@jannic
Copy link
Contributor Author

jannic commented Mar 31, 2017

I did some simple benchmarks to find out what could be a reasonable threshold to start using with_unpark_event (the X in #155 (comment)).
It looks like for a trivial poll() function, optimal throughput was reached at a threshold of about 32 in-flight futures.

This is not an exact value, at all. The optimum seems to be somewhere in the range 32 to about 300, so a larger value could be used, as well. But because, for a more expensive poll function, lower values would quickly become better choices, I'd tend to the lower end of that range.

For reference, this is the test code I used to do the benchmark:

#[test]
fn test_long_pipeline() {
    let service = simple_service(|req: Message<&'static str, Body<u32, io::Error>>| {
        assert_eq!(req, "hello");
        future::finished(Message::WithoutBody(*req.get_ref()))
    });

    let (mut mock, _other) = mock::pipeline_server(service);
    for _ in 0..1000000 {
        mock.send(msg("hello"));
    }
    for _ in 0..1000000 {
        assert_eq!(mock.next_write().unwrap_msg(), "hello");
    }
    mock.allow_and_assert_drop();
}

On the (rather old) notebook I tested it on, the test took about 7 seconds when using with_unpark_event all the time, about 4.5s in the optimal range mentioned above, and several minutes(!) when with_unpark_event was not used at all.

@carllerche
Copy link
Member

I actually started looking more into with_unpark_event and have found that it has a number of inefficiencies. I'm going to investigate if that can be improved and I discussed briefly w/ @alexcrichton about moving this logic into futures-rs as something like FutureQueue.

@carllerche
Copy link
Member

welp, this entire topic drove me to shave some yaks: rust-lang/futures-rs#436

@carllerche
Copy link
Member

Just a quick update. I didn't forget this PR. As mentioned above, I think the right way to handle it is to extract the needed logic into a stand alone type in futures-rs. However, that sent me in to the yak shave of improving the futures-rs task system...

Once we get out of that, I will try to extract the logic that is generally useful from this PR into futures-rs.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants